package rabbit

import (
	"github.com/streadway/amqp"
	"log"
)

type RabbitMQ struct {
	conn *amqp.Connection
	channel *amqp.Channel
	Exchange string  //exchange name
	Queue string //queue name
	Url string //mq url
	Key string //bind key
}
//create simple type rabbitmq
func NewRabbitMQ(url string,exchange string,queue string,key string) (*RabbitMQ,error) {
	r :=&RabbitMQ{
		Exchange: exchange,
		Queue:    queue,
		Url:      url,
		Key:      key,
	}
	var err error
	r.conn,err=amqp.Dial(url)
	if err!=nil{
		return r,err
	}
	r.channel,err=r.conn.Channel()
	return r,err
}

func  NewRabbitMQWithQueue(url string,queue string) (*RabbitMQ,error) {
	var err error
	r,err:=NewRabbitMQ(url,"",queue,"")
	return r,err
}
//订阅模式创建RabbitMQ实例
func NewRabbitMQWithExchange(url string,exchangeName string) (*RabbitMQ,error) {
	var err error
	r,err:=NewRabbitMQ(url,exchangeName,"","")
	return r,err
}
//订阅模式生产
func (r *RabbitMQ) PublishPub(msg string) error{
	err:=r.channel.ExchangeDeclare(
		r.Exchange,
		"fanout",
		true,
		false,
		//true表示这个exchange不可以被client用来推送消息，仅用来进行exchange和exchange之间的绑定
		false,
		false,
		nil,
		)
	if err!=nil{
		return err
	}
	err=r.channel.Publish(
		r.Exchange,
		"",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
	return err
}
//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub(do func(m <-chan amqp.Delivery)) error{
	err:=r.channel.ExchangeDeclare(
		r.Exchange,
		"fanout",
		true,
		false,
		//true表示这个exchange不可以被client用来推送消息，仅用来进行exchange和exchange之间的绑定
		false,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	q,err:=r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
		)
	if err!=nil{
		return err
	}
	err=r.channel.QueueBind(q.Name,"",r.Exchange,false,nil)
	if err!=nil{
		return err
	}
	msgs,err:=r.channel.Consume(q.Name,"",true,false,false,false,nil)
	forever := make(chan bool)
	go do(msgs)
	<-forever
	return nil
}
//simple type publish msg
func (r *RabbitMQ) PublishSimple(msg string) error {
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	_,err:=r.channel.QueueDeclare(r.Queue,
		//是否持久化
		false,
		//是否自动删除
		false,
		//是否具有排他性
		false,
		//是否阻塞处理
		false,
		//额外的属性
		nil,
	)
	if err!=nil{
		return err
	}
	err=r.channel.Publish(
		r.Exchange,
		r.Key,
		//如果为true，根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
		false,
		//如果为true，当exchange发送消息到队列后发现队列上没有消费者，则会把消息返还给发送者
		false,
		amqp.Publishing{
			ContentType:     "text/plain",
			Body:            []byte(msg),
		},
	)
	return err
}

func (r *RabbitMQ) ConsumeSimple(do func(m <-chan amqp.Delivery)) (err error) {
	//1.申请队列，如果队列不存在会自动创建，存在则跳过创建
	q, err := r.channel.QueueDeclare(
		r.Queue,
		//是否持久化
		false,
		//是否自动删除
		false,
		//是否具有排他性
		false,
		//是否阻塞处理
		false,
		//额外的属性
		nil,
	)
	if err != nil {
		return
	}
	//接收消息
	msgs, err := r.channel.Consume(
		q.Name, // queue
		//用来区分多个消费者
		"", // consumer
		//是否自动应答
		true, // auto-ack
		//是否独有
		false, // exclusive
		//设置为true，表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
		false, // no-local
		//列是否阻塞
		false, // no-wait
		nil,   // args
	)
	if err != nil {
		return
	}
	//forever := make(chan bool)
	go do(msgs)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	//<-forever
	return
}

//路由模式
//创建RabbitMQ实例
func NewRabbitMQWithExchangeRoutingKey(url string,exchangeName string, routingKey string) (*RabbitMQ,error){
	var err error
	r,err:=NewRabbitMQ(url,exchangeName,"",routingKey)
	return r,err
}

//路由模式发送消息
func (r *RabbitMQ) PublishRouting(message string) error{
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//要改成direct
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)

	if err!=nil{
		return err
	}
	//2.发送消息
	err = r.channel.Publish(
		r.Exchange,
		//要设置
		r.Key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	return err
}
//路由模式接受消息
func (r *RabbitMQ) RecieveRouting(do func(m <-chan amqp.Delivery)) error{
	//1.试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//交换机类型
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	//2.试探性创建队列，这里注意队列名称不要写
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	err=r.channel.QueueBind(q.Name,r.Key,r.Exchange,false,nil)
	if err!=nil{
		return err
	}
	//消费消息
	messges, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	forever := make(chan bool)

	go do(messges)
	<-forever
	return nil
}

//话题模式
//话题模式发送消息
func (r *RabbitMQ) PublishTopic(message string) error {
	//1.尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//要改成topic
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	//2.发送消息
	err = r.channel.Publish(
		r.Exchange,
		//要设置
		r.Key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	return err
}

//话题模式接受消息
//要注意key,规则
//其中“*”用于匹配一个单词，“#”用于匹配多个单词（可以是零个）
//匹配 kuteng.* 表示匹配 kuteng.hello, kuteng.hello.one需要用kuteng.#才能匹配到
func (r *RabbitMQ) RecieveTopic(do func(m <-chan amqp.Delivery)) error {
	//1.试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		//交换机类型
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	//2.试探性创建队列，这里注意队列名称不要写
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	//绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下，这里的key要为空
		r.Key,
		r.Exchange,
		false,
		nil)
	if err!=nil{
		return err
	}
	//消费消息
	messges, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	if err!=nil{
		return err
	}
	go do(messges)
	return nil
}

